package com.avris.tool.function;

import cn.hutool.core.date.DateUtil;
import cn.hutool.core.io.FileUtil;
import cn.hutool.core.util.StrUtil;
import com.avris.tool.bean.Parameter;
import com.avris.tool.consumer.KafkaConsumerClient;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.io.File;
import java.nio.charset.StandardCharsets;
import java.util.Objects;

import static com.avris.tool.util.ConsumerUtil.getOffset;
import static com.avris.tool.util.ConsumerUtil.setOffsetTime;

/**
 * @author Jast
 * @description 指定开始结束时间消费
 * @date 2022-09-19 15:29
 */
public class ConsumerForTime {

    public static void run(Parameter parameter) throws InterruptedException {
        Long ts = parameter.getBeginTime();
        String brokerList = parameter.getBrokerList();
        String topic = parameter.getTopic();
        String groupID = parameter.getGroup();
        Long endTs = parameter.getEndTime();
        String containsContent = parameter.getContent();
        if(StrUtil.isBlank(brokerList)||
                StrUtil.isBlank(topic)||
                StrUtil.isBlank(groupID)||
                Objects.isNull(ts)){
            System.out.println("启动失败,请检查必填参数是否填写");
            System.exit(1);
        }

        //String brokerList = "10.16.0.2:9092";
        //String groupID = "test20220607";
        //String topic = "userChange";  // 要重设位移的 Kafka 主题
        //Long ts = 1662963310000L;



        String fileName = System.getProperty("user.dir")+ File.separator+"data-"+System.currentTimeMillis()+".txt";


        System.out.println("数据写入文件名称:"+fileName);


        //获取当前offset
        getOffset(brokerList,topic,groupID);

        System.out.println("开始修改消费位置，注意：修改之前要停止当前正在消费的消费者组，否则会修改失败");
        setOffsetTime(brokerList,groupID,topic,ts);
        getOffset(brokerList,topic,groupID);

        System.out.println("消费位置已修改完成,5秒后开始消费数据");

        Thread.sleep(5000);

        KafkaConsumerClient kafkaConsumerClient = new KafkaConsumerClient(brokerList);

        KafkaConsumer<String, String> consumer = kafkaConsumerClient.createConsumer(topic, groupID, 10, false);

        while(true){

            ConsumerRecords<String, String> records = consumer.poll(10000);
            for(ConsumerRecord record:records){
                long timestamp = record.timestamp();
                if(endTs!=null && timestamp>endTs){
                    System.out.println("当前数据timestamp大于配置:"+timestamp+",停止服务");
                    System.out.println("数据写入文件名称:"+fileName);
                    System.exit(0);
                }
                System.out.println("写入文件数据信息 -> "+record.partition()+"\t"+record.offset()+"\t数据时间:"+ DateUtil.date(timestamp));
                String data = record.value().toString()+"\n";

                if(StrUtil.isNotBlank(containsContent)) {
                    if(containsContent.contains(",")){
                        String[] contains = containsContent.split(",");
                        for (String contain : contains) {
                            if (data.contains(contain)) {
                                FileUtil.appendString(data, fileName, StandardCharsets.UTF_8);
                            }
                        }
                    }else {
                        if (data.contains(containsContent)) {
                            FileUtil.appendString(data, fileName, StandardCharsets.UTF_8);
                        }
                    }
                }else{
                    FileUtil.appendString(data, fileName, StandardCharsets.UTF_8);
                }
            }
            if(records.count()==0){
                getOffset(brokerList,topic,groupID);
                Thread.sleep(5000);
            }
            Thread.sleep(10);
        }
    }
}
